Tour of Beam をやる
Introduction
Beam のコンセプトについて
Runner とは
pipeline と runner という概念がある
pipeline は portability がある
runner はたとえは Google Clou Dataflow 用の runner とかそういうの
Direct runner
Direct runner はあなたの PC で pipeline を実行する。通常の Runner は、データを並列に処理したりスケーリングしたりして効率よくデータを処理するが、Direct runner はそういったことはしない。単に、正しく Apache Beam のコードが書かれているかチェックするためのものである
Apache Beam にはいくつかの制約があり、例えば、次の観点をチェックする
要素(elements = 処理するデータ)の immutability(不変性)
要素がエンコード可能であること
要素が任意の順番で処理されても問題ないこと
DoFn, CombineFn などによってユーザー定義された関数が、シリアライズ可能であること
Apache Beam はデータを分割して並列処理したりするため、そのための制約事項がいくつかある
Direct runner は、テストは開発で利用し、どんな Beam runner を使ってもパイプラインが正常に実行されることを確認するためのものである。
Remote cluster (= Google Cloud Dataflow などの実際の実行環境)でパイプラインが失敗した時に、デバッグする際にもこの Direct runner が使える(non-trivial = 重要である)。ローカルで単体テストする方が簡単で速いし、各種デバッグツールも使える。
Specify your dependency
ビルドツール maven を使ってdirect-runner を使う場合の
Set runner
code:sh
--runner=DirectRunner
Google Cloud Dataflow Runner
Google Cloud Dataflow で実行したい場合は、 Google Cloud Storage にコードをアップロードして、Dataflow job を実行できる。
Apache Flink Runner
Apalche Flink というのを使っても動かせる
Apache Spart Runner
Sazma Runner
Nemo Runner
Jet Runner
...
Pipeline の概要
Beam を使うためには、まず pipeline を定義する。
pipeline は input, transform, output などを含む。
実行時オプションも設定する。実行時オプションは通常はコマンドラインで実行する時のオプションとして渡される。
Beam にはいくつかの抽象化されたもの(abstract クラス等)が用意されており、データ処理がスケールするように設計されている
Beam は、バッチ処理でも、ストリーミングデーター処理でも動作する。
Beam の pipeline を実装するために、いくつかの用語を知っておく必要がある
Pipeline
データ処理の全体を表す。データ処理の開始から終了までのこと。
pipeline には、inputデータの読み込み、データの変換、outputへの書き込みが含まれる。
PCollection
PCollection は Beam pipeline が処理すべきデータセットを表す
データセットは bonded なものと、unbounded なものがある
bounded とは、決まった量のデータであること
unbounded とは、Pub/Sub のサブスクリプションのように、リアルタイムに継続的にデータが届くこと
Pileline は通常、どこか外部のデータを読み込んで PCollection をつくるところから始まる
PCollection は、Pipeline の各「ステップ」の input と output となる
PTransform
PTransform はパイプラインの中のデータ処理のステップ
すべての PTransform は1つ以上の PCollection を受取り、データ処理を行い、0個以上の PCollection を返す
I/O transforms
外部のストレージなどからデータを読み込む PTransform
一般的に Beam はこのように動作する
Pipeline オブジェクトを作成する
最初の PCollection を作成する。外部のストレージからデータを読み込んできたりする
PTransform を PCollection に Apply (適用)する
Transform は、PCollection に対して filter, group などを実行したり、PCollection の中の各要素(elements)に対して処理を実行したりする。
Transform は、input の PCollection を直接編集するのではなく、新しい PCollection オブジェクトを output する。pipeline はは一直線の処理とは限らないためである。
PCollection は変数、PTransform は関数と考えると良い
最後に、結果を書き込むために IO を行う
Pipeline Runner を使って pipeline を実行する
パイプラインを作成する
code:java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.transforms.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Task {
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline pipeline = Pipeline.create(options);
PCollection<String> output = createPCollection(pipeline);
output.apply("Log", ParDo.of(new LogOutput<String>()));
pipeline.run();
}
static PCollection<String> createPCollection(Pipeline pipeline) {
return pipeline.apply(Create.of("Hello Beam"));
}
static class LogOutput<T> extends DoFn<T, T> {
private String prefix;
LogOutput() {
this.prefix = "Processing element";
}
LogOutput(String prefix) {
this.prefix = prefix;
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
LOG.info(prefix + ": {}", c.element());
}
}
}
Pipeline options
List of Beam Terms (Apache Beam 用語集)
Pipeline
PCollection
PTransform
Aggregation
1つ以上の input elements から計算される値のこと
User-defined function (UDF)
いくつかの Beam の operation は、ユーザー定義のコードを実行できる。
Schema
A schema is a language-independent type definition for a PCollection. The schema for a PCollection defines elements of that PCollection as an ordered list of named fields.
SDK
Runner
Window
PCollection は element の timestamp によっていくつかの window 二分割される
Windows enable grouping operations over collections that grow over time by dividing the collection into windows of finite collections.
Watermark
A watermark is a guess as to when all data in a certain window is expected to have arrived. This is needed because data isn’t always guaranteed to arrive in a pipeline in event time order, or to always arrive at predictable intervals.
Trigger
いつ window のデータを aggregate するかを決定するトリガー
State and timers
Per-key state and timer callbacks are lower level primitives that give you full control over aggregating input collections that grow over time.
Splittable DoFn
Splittable DoFns let you process elements in a non-monolithic way. You can checkpoint the processing of an element, and the runner can split the remaining work to yield additional parallelism.
Common Transform
Filter
Aggregations
Count
Sum
Mean
Min
Max
WithKeys
PCollection<V> から PCollection<KV<K, V>> に変換する処理
Core Transforms
Map
PerDo one-to-one
PerDo とは
Parallel Do のこと
ParDo に DoFn を渡す
code:java
// The input PCollection of Strings.
PCollection<String> input = ...;
// The DoFn to perform on each element in the input PCollection.
static class ComputeWordLengthFn extends DoFn<String, Integer> { ... }
// Apply a ParDo to the PCollection "input" to compute lengths for each word.
PCollection<Integer> wordLengths = input.apply(
ParDo
.of(new ComputeWordLengthFn())); // The DoFn to perform on each element, which
// we define above.
DoFn の中では @ProcessElement というアノテーションが使える
@Timestamp, @Element, PaneInfo, PipelineOptions などを引数に受け取ることもできる
Side-inputs example
Side input とは、 DoFn が PCillection の各 element を処理する時に利用できる値
ParDo 野中の DoFn が、各 element を処理する時に、他のデータを side input として使いたい場合は、そのデータの view を作ることになる。
side inoput は input の PCollection を処理する時に、他の情報を使いたい場合に使う
しかし、side input のデータは runtime 時に決定する必要があります(ハードコードすることはできない)。
例えば、パイプラインのほかの部分の値を side input として使うなど
side input は withSideInputs で渡す
PCollection はもしかしたら無限の場合もある(streaming job の場合)
そこで、ある window の PCollection に対して PCollectionView を作成する。